use super::{const_buf, mut_buf, Fd};
use crate::runtime::{sched_from_ctx, task_from_ctx, worker_from_ctx, TaskRef};
use crate::{
    event::{Event, Scheduler, POLLET, POLLIN, POLLOUT},
    Error,
};
use core::future::Future;
use core::ops::Deref;
use core::pin::Pin;
use core::ptr::{self, NonNull};
use core::task::{Context, Poll};
use hioff::container_of_mut;

/// 使用约束:
/// 1. 同一个Fd在一个工作线程只能有一个AioFd实例, 要求异步读写都只能在一个异步任务中完成.
/// 2. 如果必须将异步读写分离或者需要多个读写操作，需要利用Fd::clone复制Fd来实现.
#[repr(C)]
pub struct AioFd<'a> {
    pub(crate) fd: &'a Fd,
    event: Event,
    events: u32,
    task: Option<TaskRef>,
    waker: Option<TaskRef>,
    sched: Option<NonNull<Scheduler>>,
}

// Fd支持Sync，当前实现的Scheduler支持Sync，可以跨线程del_fd_event
unsafe impl Send for AioFd<'_> {}
unsafe impl Sync for AioFd<'_> {}

impl<'a> AioFd<'a> {
    /// 同一个Fd在一个工作线程中只能使用一个AioFd，如果业务上必须要多个，可以使用Fd::clone复制新的Fd来实现.
    pub fn new(fd: &'a Fd) -> Self {
        Self {
            fd,
            event: Event::new(Self::event_handle),
            task: None,
            waker: None,
            sched: None,
            events: 0,
        }
    }
}

impl AioFd<'_> {
    /// 内部实际注册的IO事件为`events | POLLET`,
    /// 因此使用人员应该确保仅在调用`try_`系列接口返回EAGAIN时才调用此接口.
    /// 此接口一旦调用，当前异步任务就绑定在当前工作线程中运行.
    pub async fn wait(&mut self, events: u32) -> Result<(), Error> {
        FdWait::new(self, events).await
    }

    /// 读取的数据填满buf后才返回，除非对端断链，此时返回Ok(size)将小于buf的长度.
    pub async fn read_all(&mut self, mut buf: &mut [u8]) -> Result<usize, Error> {
        let mut recved = 0;
        loop {
            let ret = unsafe { libc::read(self.fd.fd, mut_buf(buf), buf.len()) };
            // 频繁调用场景，但rust无法指定分支预测功能
            #[allow(clippy::comparison_chain)]
            if ret > 0 {
                let n = ret as usize;
                recved += n;
                if n == buf.len() {
                    return Ok(recved);
                }
                buf = &mut buf[n..];
            } else if ret == 0 {
                return Ok(recved);
            } else {
                let e = Error::last_error();
                if e.errno == libc::EAGAIN {
                    self.wait(POLLIN).await?;
                } else if e.errno != libc::EINTR {
                    return Err(e);
                }
            }
        }
    }

    /// 将buf的数据全部发送出去后才返回.
    pub async fn write_all(&mut self, mut buf: &[u8]) -> Result<usize, Error> {
        let mut sended = 0;
        loop {
            let ret = unsafe { libc::write(self.fd.fd, const_buf(buf), buf.len()) };
            if ret >= 0 {
                let n = ret as usize;
                sended += n;
                if n == buf.len() {
                    return Ok(sended);
                }
                buf = &buf[n..];
            } else {
                let e = Error::last_error();
                if e.errno == libc::EAGAIN {
                    self.wait(POLLOUT).await?;
                } else if e.errno != libc::EINTR {
                    return Err(e);
                }
            }
        }
    }

    /// 将数据全部发送出去后才返回.
    pub async fn sendfile_all(&mut self, in_fd: i32, off: usize, count: usize) -> Result<usize, Error> {
        let mut off = off as i64;
        let mut len = count;
        let end = off + count as i64;
        loop {
            let ret = unsafe { libc::sendfile(self.fd.fd, in_fd, &mut off, len) };
            if ret >= 0 {
                if off == end {
                    return Ok(count);
                }
                len -= ret as usize;
            } else {
                let e = Error::last_error();
                if e.errno == libc::EAGAIN {
                    self.wait(POLLOUT).await?;
                } else if e.errno != libc::EINTR {
                    return Err(e);
                }
            }
        }
    }

    /// 发送至少一个字节的数据就返回.
    pub async fn sendfile(&mut self, in_fd: i32, off: usize, count: usize) -> Result<usize, Error> {
        let mut off = off as i64;
        loop {
            let ret = unsafe { libc::sendfile(self.fd.fd, in_fd, &mut off, count) };
            if ret >= 0 {
                return Ok(ret as usize);
            } else {
                let e = Error::last_error();
                if e.errno == libc::EAGAIN {
                    self.wait(POLLOUT).await?;
                } else if e.errno != libc::EINTR {
                    return Err(e);
                }
            }
        }
    }

    /// 收到至少一个字节的数据就返回，如果返回Ok(0)说明对端断开连接.
    pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
        loop {
            let ret = unsafe { libc::read(self.fd.fd, mut_buf(buf), buf.len()) };
            if ret >= 0 {
                return Ok(ret as usize);
            } else {
                let e = Error::last_error();
                if e.errno == libc::EAGAIN {
                    self.wait(POLLIN).await?;
                } else if e.errno != libc::EINTR {
                    return Err(e);
                }
            }
        }
    }

    /// 发送至少一个字节的数据就返回.
    pub async fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
        loop {
            let ret = unsafe { libc::write(self.fd.fd, const_buf(buf), buf.len()) };
            if ret >= 0 {
                return Ok(ret as usize);
            } else {
                let e = Error::last_error();
                if e.errno == libc::EAGAIN {
                    self.wait(POLLOUT).await?;
                } else if e.errno != libc::EINTR {
                    return Err(e);
                }
            }
        }
    }
}

impl AioFd<'_> {
    fn event_handle(e: &Event, _events: u32, sched: &mut Scheduler) {
        let this = unsafe { container_of_mut!(e, Self, event) };
        if let Some(task) = this.waker.take() {
            task.clone().fast_wake(sched);
            this.task = Some(task);
        }
    }

    // 如果在唤醒之前错误的重入，会导致再次触发注册，效率会降低，功能没问题.
    fn set_task(&mut self, ctx: &mut Context<'_>) -> bool {
        let current = unsafe { task_from_ctx(ctx).as_mut() };
        if let Some(task) = self.task.take() {
            if ptr::eq(&*task, current) {
                self.waker = Some(task);
                return false;
            }
            // AioFd在task之间转移了, 首先取消注册
            self.del_event();
            self.events = 0;
        }
        //总是将task绑定到当前worker，不能切换, 可以和abort并发
        let worker = unsafe { worker_from_ctx(ctx).as_ref() };
        current.status.set_local(worker.worker_id());
        current.inc_ref();
        self.waker = Some(unsafe { TaskRef::from(current) });
        true
    }

    // 有以下关键场景需要解决
    // 1. 需要消除和JoinHandle::abort调用的Waker::wake的并发冲突
    // 2. 需要考虑AioFd被转移到新的Task并在新的Woker中被调度
    // 3. 需要考虑AioFd在Task间转移后可能导致注册IO事件的Event地址失效问题 
    //
    // 对应的解决方案:
    // 1. 需要始终持有当前Task的引用，确保注册IO事件的Event地址使用有效
    // 2. Task始终和Woker绑定
    // 3. 总是判断是否在Task间转移
    //
    // 说明:
    // 还有一种场景是在task内部发生了所有权转移，因为task都在堆上分配，转移后原内存地址仍然有效，当前rustc的实现也不会修改原内存空间的内容，因此无需重新注册也没有问题.
    // 当然最好的方式是能够判断是否在所有情况下都发生了转移，只是这样会多增加一个指针成员,
    // 暂时无需解决这个问题.
    fn add(&mut self, events: u32, ctx: &mut Context<'_>) -> Poll<Result<(), Error>> {
        let first = self.set_task(ctx);
        if self.events == events {
            return Poll::Pending;
        }
        let sched = unsafe { sched_from_ctx(ctx).as_ref() };
        let ret = if !first {
            unsafe { sched.mod_fd_event(&self.event, events | POLLET, self.fd.fd) }
        } else {
            let ret = unsafe { sched.add_fd_event(&self.event, events | POLLET, self.fd.fd) };
            if ret.is_ok() {
                self.sched = Some(NonNull::from(sched));
            }
            ret
        };
        match ret {
            Ok(_) => {
                self.events = events;
                Poll::Pending
            }
            Err(e) => Poll::Ready(Err(e)),
        }
    }

    fn del_event(&mut self) {
        if let Some(sched) = self.sched.take() {
            let _ = unsafe { sched.as_ref().del_fd_event(self.fd.fd) };
        }
    }
}

impl Drop for AioFd<'_> {
    fn drop(&mut self) {
        self.del_event();
    }
}

impl Deref for AioFd<'_> {
    type Target = Fd;
    fn deref(&self) -> &Self::Target {
        self.fd
    }
}

struct FdWait<'a, 'b> {
    aio: &'a mut AioFd<'b>,
    events: u32,
}

impl<'a, 'b> FdWait<'a, 'b> {
    fn new(aio: &'a mut AioFd<'b>, events: u32) -> Self {
        Self { aio, events }
    }
}

impl Future for FdWait<'_, '_> {
    type Output = Result<(), Error>;
    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        if self.events > 0 {
            let events = self.events;
            self.events = 0;
            self.aio.add(events, ctx)
        } else {
            Poll::Ready(Ok(()))
        }
    }
}
